package com.gis507.test.interpolation;

import com.gis507.test.AISDataSort.AISDataBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.gavaghan.geodesy.Ellipsoid;
import org.gavaghan.geodesy.GeodeticCalculator;
import org.gavaghan.geodesy.GeodeticCurve;
import org.gavaghan.geodesy.GlobalCoordinates;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class AISInterReducer extends Reducer<Text,AISInterBean,Text,AISInterBean> {

    //设定的最大时间
    private static final long STANDTIME = 10;
    //设定的最大距离
    private static final double STANDSTANCE = 20;

    @Override
    protected void reduce(Text key, Iterable<AISInterBean> values, Context context) throws IOException, InterruptedException {

        //实例化一个列表，用于存放数据
        List<AISInterBean> aisList = new ArrayList<>(1000);

        //遍历values
        for (AISInterBean value : values) {

            //将value添加到列表
            aisList.add(new AISInterBean(value.getSpeed(), value.getUnixTime(), value.getLon_d(), value.getLat_d(),
                    value.getDraught(), value.getLength(), value.getWidth(), value.getCallsign(), value.getCourse(),
                    value.getRot()));

        }

        log.info("aisList的长度是：" + aisList.size());

        //所有插值得到的数据放到一个列表中
        ArrayList<AISInterBean> allAisInterList = new ArrayList<>();

        //所有满足的数据
        ArrayList<AISInterBean> rightAisList = new ArrayList<>();

        //aisList是一个放有数据的列表
        for (int i = 0; i < aisList.size() - 1; i++) {

            //判断两点之间的差是否大于规定的最大时间差
            //java中取绝对值 Math.abs()
            if (Math.abs(aisList.get(i + 1).getUnixTime() - aisList.get(i).getUnixTime()) > STANDTIME) {

                //判断是否距离超限，如果超限就插值，不超限就说明可能是停靠，就可以分段
                if (getStance(aisList.get(i), aisList.get(i + 1)) > STANDSTANCE) {

                    //往数据里插值
                    ArrayList<AISInterBean> aisInterList = insertPoint(aisList.get(i), aisList.get(i + 1));
                    //将本次插值的数据放到总的插值列表中
                    allAisInterList.addAll(aisInterList);
                } else {
                    log.info("这个点应该是个停靠点：" + aisList.get(i));
                }

            } else {
                //满足的数据列表
                rightAisList.add(new AISInterBean(aisList.get(i).getSpeed(), aisList.get(i).getUnixTime(), aisList.get(i).getLon_d(),
                        aisList.get(i).getLat_d(), aisList.get(i).getDraught(), aisList.get(i).getLength(), aisList.get(i).getWidth(),
                        aisList.get(i).getCallsign(), aisList.get(i).getCourse(), aisList.get(i).getRot()));
                log.info("符合条件的添加成功一条数据");
            }
        }

        //汇总数据
        rightAisList.addAll(allAisInterList);

        //写出
        for (AISInterBean aisInterBean : rightAisList) {

            context.write(key, aisInterBean);
        }


    }

    /**
     * 距离的计算
     */
//    public double getStance(AISInterBean ais1,AISInterBean ais2){
//
//        //Math.pow(x,y)  x的y次方
//        double lat = Math.pow((ais1.getLat_d() - ais2.getLat_d()),2);
//        double lon = Math.pow((ais1.getLon_d() - ais2.getLon_d()),2);
//        //平方根就是1/2次方
//        double stance = Math.pow((lat + lon),0.5);
//
//        return stance;
//    }

    /**
     * 求解两个坐标之间的距离
     *
     * @param gpsFrom   第一个点坐标
     * @param gpsTo     第二个点坐标
     * @param ellipsoid 使用的坐标系
     * @return
     */
    public static double getDistanceMeter(GlobalCoordinates gpsFrom, GlobalCoordinates gpsTo, Ellipsoid ellipsoid) {
        //创建GeodeticCalculator，调用计算方法，传入坐标系、经纬度用于计算距离
        GeodeticCurve geoCurve = new GeodeticCalculator().calculateGeodeticCurve(ellipsoid, gpsFrom, gpsTo);

        return geoCurve.getEllipsoidalDistance();
    }


    /**
     * 求得距离
     */
    public double getStance(AISInterBean ais1, AISInterBean ais2) {


        GlobalCoordinates source = new GlobalCoordinates(ais1.getLat_d(), ais2.getLon_d());
        GlobalCoordinates target = new GlobalCoordinates(ais2.getLat_d(), ais2.getLon_d());


        double meter = getDistanceMeter(source, target, Ellipsoid.WGS84);

        return meter;
    }


    /**
     * 插值
     */
    public ArrayList<AISInterBean> insertPoint(AISInterBean ais1, AISInterBean ais2) {

        //获得实际距离
        double stance = getStance(ais1, ais2);
        //按照距离进行插值，需要插入点的个数
        double number = Math.floor(stance / STANDSTANCE) - 1;

        //实例化一个列表，将所有添加的数据放大集合中
        ArrayList<AISInterBean> aisList = new ArrayList<>();

        //插入的点放入集合中
        for (int i = 1; i <= number; i++) {

            //speed就应该是大于ais1，并且小于ais2
            double speed = ais1.getSpeed() + (ais2.getSpeed() - ais1.getSpeed()) * i / (number + 1);

            //插入点的lat和lon
            double lat = ais1.getLat_d() + (ais2.getLat_d() - ais1.getLat_d()) * i / (number + 1);
            double lon = ais1.getLon_d() + (ais2.getLon_d() - ais1.getLon_d()) * i / (number + 1);

            //时间
            //Math.round():Double类型转换为Long
            //long转换为double,(double)long;
            Long unixTime = Math.round(ais1.getUnixTime() + (ais2.getUnixTime() - ais1.getUnixTime()) * i / (number + 1));

            //course,rot等其他字段，都采用ais1的
            //speed,unixTime,lon_d,lat_d,draught,length,width,callsign,course,rot
            String draught = ais1.getDraught();
            String length = ais1.getLength();
            String width = ais1.getWidth();
            String callsign = ais1.getCallsign();
            double course = ais1.getCourse();
            double rot = ais1.getRot();

            //封装到aisData中
            AISInterBean aisData = new AISInterBean(speed, unixTime, lon, lat, draught, length, width, callsign, course, rot);

            //输出插值得到的点
            log.info("插值得到的点：" + aisData);
            //放到列表中
            aisList.add(aisData);

        }

        //返回列表
        return aisList;

    }
}

